/***
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package spider.url.bf;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;

import billing.system.logger.Logger;

public class BloomFilter extends Filter {
	private static final Logger logger = Logger
			.getLogger(BloomFilter.class);
	static ICompactSerializer<BloomFilter> serializer_ = new BloomFilterSerializer();
	private static final int EXCESS = 20;
	public static ICompactSerializer<BloomFilter> serializer() {
		return serializer_;
	}

	private BitSet filter_;

	BloomFilter(int hashes, BitSet filter) {
		hashCount = hashes;
		filter_ = filter;
	}

	private static BitSet bucketsFor(long numElements, int bucketsPer) {
		long numBits = numElements * bucketsPer + EXCESS;
		return new BitSet((int) Math.min(Integer.MAX_VALUE, numBits));
	}

	/***
	 * Calculates the maximum number of buckets per element that this
	 * implementation can support. Crucially, it will lower the bucket count if
	 * necessary to meet BitSet's size restrictions.
	 */
	private static int maxBucketsPerElement(long numElements) {
		numElements = Math.max(1, numElements);
		double v = (Integer.MAX_VALUE - EXCESS) / (double) numElements;
		if (v < 1.0) {
			throw new UnsupportedOperationException(
					"Cannot compute probabilities for " + numElements
							+ " elements.");
		}
		return Math.min(BloomCalculations.probs.length - 1, (int) v);
	}

	/***
	 * @return A BloomFilter with the lowest practical false positive
	 *         probability for the given number of elements.
	 */
	public static BloomFilter getFilter(long numElements,
			int targetBucketsPerElem) {
		int maxBucketsPerElement = Math.max(1,
				maxBucketsPerElement(numElements));
		int bucketsPerElement = Math.min(targetBucketsPerElem,
				maxBucketsPerElement);
		if (bucketsPerElement < targetBucketsPerElem) {
			logger.warn(String
					.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).",
							numElements, bucketsPerElement,
							targetBucketsPerElem));
		}
		BloomCalculations.BloomSpecification spec = BloomCalculations
				.computeBloomSpec(bucketsPerElement);
		return new BloomFilter(spec.K, bucketsFor(numElements,
				spec.bucketsPerElement));
	}

	/***
	 * @return The smallest BloomFilter that can provide the given false
	 *         positive probability rate for the given number of elements.
	 * 
	 *         Asserts that the given probability can be satisfied using this
	 *         filter.
	 */
	public static BloomFilter getFilter(long numElements,
			double maxFalsePosProbability) {
		assert maxFalsePosProbability <= 1.0 : "Invalid probability";
		int bucketsPerElement = maxBucketsPerElement(numElements);
		BloomCalculations.BloomSpecification spec = BloomCalculations
				.computeBloomSpec(bucketsPerElement, maxFalsePosProbability);
		return new BloomFilter(spec.K, bucketsFor(numElements,
				spec.bucketsPerElement));
	}

	public void clear() {
		filter_.clear();
	}

	int buckets() {
		return filter_.size();
	}

	BitSet filter() {
		return filter_;
	}

	public boolean isPresent(String src) {
		ByteBuffer key = ByteBuffer.wrap(src.getBytes());
		for (int bucketIndex : getHashBuckets(key)) {
			if (!filter_.get(bucketIndex)) {
				return false;
			}
		}
		return true;
	}

	/**
	 * @param key
	 *            -- value whose hash is used to fill the filter_. This is a
	 *            general purpose API.
	 */
	public void add(String src) {
		ByteBuffer key = ByteBuffer.wrap(src.getBytes());
		for (int bucketIndex : getHashBuckets(key)) {
			filter_.set(bucketIndex);
		}
	}

	public String toString() {
		return filter_.toString();
	}

	@SuppressWarnings("rawtypes")
	ICompactSerializer tserializer() {
		return serializer_;
	}

	int emptyBuckets() {
		int n = 0;
		for (int i = 0; i < buckets(); i++) {
			if (!filter_.get(i)) {
				n++;
			}
		}
		return n;
	}

	/*** @return a BloomFilter that always returns a positive match, for testing */
	public static BloomFilter alwaysMatchingBloomFilter() {
		BitSet set = new BitSet(64);
		set.set(0, 64);
		return new BloomFilter(1, set);
	}
}

class BloomFilterSerializer implements ICompactSerializer<BloomFilter> {
	public void serialize(BloomFilter bf, DataOutputStream dos)
			throws IOException {
		dos.writeInt(bf.getHashCount());
		BitSetSerializer.serialize(bf.filter(), dos);
	}

	public BloomFilter deserialize(DataInputStream dis) throws IOException {
		int hashes = dis.readInt();
		BitSet bs = BitSetSerializer.deserialize(dis);
		return new BloomFilter(hashes, bs);
	}
}